package org.xqh.test.yzs.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;

import java.io.UnsupportedEncodingException;

/**
 * @ClassName MqttOptCallback
 * @Description mqtt callback
 * @Author xuqianghui
 * @Date 2019/8/20 19:24
 * @Version 1.0
 */
@Slf4j
public class MqttOptCallback implements MqttCallback {

    private MqttClient mqttClient;

    private MqttConnectOptions options;

    private boolean isSub;

    public MqttOptCallback(MqttClient client, MqttConnectOptions options, boolean isSub){
        this.mqttClient = client;
        this.options = options;
        this.isSub = isSub;
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("disconnect mqtt server.", throwable);
        if(isSub){
            while (!this.mqttClient.isConnected()){
                log.info("start connect to mqtt server.");
                mqttClient.setCallback(this);
                try {
                    mqttClient.connect(options);
                } catch (MqttException e) {
                    log.error("", e);
                }
                if(!this.mqttClient.isConnected()){
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        log.error("", e);
                    }
                }
            }
            log.info("connect to mqtt server complete..");
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        String printMsg = String.format("receive msg: %s, topic: %s", new String(mqttMessage.getPayload(), "UTF-8"), topic);
        System.out.println(printMsg);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发布成功.........");
    }
}
